#include "async_thread_pool.h"

#include "report.h"

namespace afcore {
namespace log {

CAsyncThreadPool::CAsyncThreadPool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start)
  : q_(q_max_items) {
  if (0 == threads_n || threads_n > 1000) {
    ThrowCLogeException("afcore::CAsyncThreadPool: invalid threads_n param (valid range is 1-1000)");
  }

  for (size_t i = 0; i < threads_n; ++i) {
    threads_.emplace_back([this, on_thread_start] {
      on_thread_start();
      this->CAsyncThreadPool::WorkerLoop();
    });
  }
}

CAsyncThreadPool::CAsyncThreadPool(size_t q_max_items, size_t threads_n)
  : CAsyncThreadPool(q_max_items, threads_n, []{} ) {
}

CAsyncThreadPool::~CAsyncThreadPool() {
  try {
    for (size_t i = 0; i < threads_.size(); ++i) {
      PostAsyncMsg(SAsyncLogMessage(EAsyncLogMessageType::kAsyncLogMessageType_Terminate),
        EAsyncOverflowPolicy::kAsyncOverflowPolicy_Block);
    }

    for (auto& t : threads_) {
      t.join();
    }
  } catch (...) {
  }
}

void CAsyncThreadPool::PostLog(RAsyncLoggerSptr&& worker_ptr, const SLogMessage& msg, EAsyncOverflowPolicy overflow_policy) {
  SAsyncLogMessage async_msg(std::move(worker_ptr), EAsyncLogMessageType::kAsyncLogMessageType_Log, msg);
  PostAsyncMsg(std::move(async_msg), overflow_policy);
}

void CAsyncThreadPool::PostFlush(RAsyncLoggerSptr &&worker_ptr, EAsyncOverflowPolicy overflow_policy) {
  PostAsyncMsg(SAsyncLogMessage(std::move(worker_ptr), EAsyncLogMessageType::kAsyncLogMessageType_Flush), overflow_policy);
}

size_t CAsyncThreadPool::GetOverrunCount() {
  return q_.GetOverrunCount();
}

void CAsyncThreadPool::PostAsyncMsg(SAsyncLogMessage &&new_msg, EAsyncOverflowPolicy overflow_policy) {
  if (EAsyncOverflowPolicy::kAsyncOverflowPolicy_Block == overflow_policy) {
    q_.Enqueue(std::move(new_msg));
  } else {
    q_.EnqueueNoWait(std::move(new_msg));
  }
}

void CAsyncThreadPool::WorkerLoop() {
  while (ProcessNextMsg()) {}
}

bool CAsyncThreadPool::ProcessNextMsg() {
  SAsyncLogMessage incoming_async_msg;
  bool dequeued = q_.Dequeue(incoming_async_msg, kAsyncThreadPoolProcessMsgCd);
  if (!dequeued) {
    return true;
  }

  switch (incoming_async_msg.msg_type) {
    case EAsyncLogMessageType::kAsyncLogMessageType_Log:
      {
        incoming_async_msg.worker_ptr->BackendAppenderDoLog(incoming_async_msg);
        return true;
      }
      break;
    case EAsyncLogMessageType::kAsyncLogMessageType_Flush:
      {
        incoming_async_msg.worker_ptr->BackendDoFlush();
        return true;
      }
      break;
    case EAsyncLogMessageType::kAsyncLogMessageType_Terminate:
      {
        return false;
      }
      break;
    default:
      {
        DBG_ASSERT(false, "async type error %u", incoming_async_msg.msg_type);
      }
  }
  return true;
}

} // !namespace log

using namespace log;

template class AFCORE_COMMON_API CMpmcQueue<SAsyncLogMessage>;

} // !namespace afcore